package com.gitee.Jmysy.binlog4j.core.dispatcher;

import com.alibaba.druid.DbType;
import com.alibaba.druid.sql.parser.SQLParserUtils;
import com.gitee.Jmysy.binlog4j.core.BinlogClientConfig;
import com.gitee.Jmysy.binlog4j.core.BinlogEventHandlerInvoker;
import com.gitee.Jmysy.binlog4j.core.position.BinlogPosition;
import com.gitee.Jmysy.binlog4j.core.position.BinlogPositionHandler;
import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.*;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;

import java.io.Serializable;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@Slf4j
public class BinlogEventDispatcher implements BinaryLogClient.EventListener {
    private final Map<Long, TableMapEventData> tableMap = new HashMap<>();

    private final Map<String, BinlogEventHandlerInvoker> eventHandlerMap;

    private final BinlogClientConfig clientConfig;

    private final BinlogPositionHandler binlogPositionHandler;

    public BinlogEventDispatcher(BinlogClientConfig clientConfig, BinlogPositionHandler positionHandler, Map<String, BinlogEventHandlerInvoker> eventHandlerMap) {
        this.clientConfig = clientConfig;
        this.eventHandlerMap = eventHandlerMap;
        this.binlogPositionHandler = positionHandler;
    }


    @Override
    public void onEvent(Event event) {
        EventHeaderV4 headerV4 = event.getHeader();
        EventType eventType = headerV4.getEventType();
        long timestamp = headerV4.getTimestamp();
        if (eventType == EventType.TABLE_MAP) {
            TableMapEventData eventData = event.getData();
            tableMap.put(eventData.getTableId(), eventData);
        } else {
            try {
                Map<String, String> ddl = isDDL(eventType, event);
                if (!ddl.isEmpty()) {
                    long serverId = clientConfig.getServerId();
                    String database = ddl.get("db");
                    String table = ddl.get("table");
                    String sql = ddl.get("sql");
                    log.info("DDLMap======================》"+ddl.toString());

                    this.eventHandlerMap.forEach((handlerKey, eventHandler) -> {
                        eventHandler.invokeDDL(serverId, database, table, sql, timestamp);
                        //删除缓存的表信息
                        eventHandler.removeMap(database,table);
                    });
                }
            }catch (Exception e){
                log.error("DDLException======================》"+e.toString());
                e.printStackTrace();
            }

            EventData eventData = event.getData();
           /* if (null!=eventData&&(eventData instanceof UpdateRowsEventData||eventData instanceof WriteRowsEventData||eventData instanceof DeleteRowsEventData)) {*/
            if(EventType.isRowMutation(eventType)){
                RowMutationEventData rowMutationEventData = new RowMutationEventData(eventData);
                TableMapEventData tableMapEventData = tableMap.get(rowMutationEventData.getTableId());
                if (tableMapEventData != null) {
                    long serverId = clientConfig.getServerId();
                    String database = tableMapEventData.getDatabase();
                    String table = tableMapEventData.getTable();
                        if (eventData instanceof WriteRowsEventData) {
                            this.eventHandlerMap.forEach((handlerKey, eventHandler) -> {
                                eventHandler.invokeInsert(serverId, database, table, rowMutationEventData.getInsertRows(), timestamp);
                            });
                        }
                        if (eventData instanceof UpdateRowsEventData) {
                            this.eventHandlerMap.forEach((handlerKey, eventHandler) -> {
                                eventHandler.invokeUpdate(serverId, database, table, rowMutationEventData.getUpdateRows(), timestamp);
                            });
                        }
                        if (eventData instanceof DeleteRowsEventData) {
                            this.eventHandlerMap.forEach((handlerKey, eventHandler) -> {
                                eventHandler.invokeDelete(serverId, database, table, rowMutationEventData.getDeleteRows(), timestamp);
                            });
                        }
                }
            }
           /* if (null!=eventData&&!(eventData instanceof UpdateRowsEventData||eventData instanceof WriteRowsEventData||eventData instanceof DeleteRowsEventData)) {
                RowMutationEventData rowMutationEventData = new RowMutationEventData(eventData);
                TableMapEventData tableMapEventData = tableMap.get(rowMutationEventData.getTableId());
                if (tableMapEventData != null&&null!=tableMapEventData.getTable()&&null!=tableMapEventData.getDatabase()) {
                    log.info("eventException======================》"+event.toString());
                    log.info("eventException======================》"+event.getData());
                    log.error("eventException======================》"+event.toString());
                    log.error("eventException======================》"+event.getData());
                }

            }*/


            if (EventType.isRowMutation(eventType)&&false) {

                RowMutationEventData rowMutationEventData = new RowMutationEventData(event.getData());
                TableMapEventData tableMapEventData = tableMap.get(rowMutationEventData.getTableId());
                if (tableMapEventData != null) {
                    long serverId = clientConfig.getServerId();
                    String database = tableMapEventData.getDatabase();
                    String table = tableMapEventData.getTable();


                    this.eventHandlerMap.forEach((handlerKey, eventHandler) -> {
                        if (EventType.isUpdate(eventType)) {
                            eventHandler.invokeUpdate(serverId, database, table, rowMutationEventData.getUpdateRows(), timestamp);
                            return;
                        }
                        if (EventType.isDelete(eventType)) {
                            eventHandler.invokeDelete(serverId, database, table, rowMutationEventData.getDeleteRows(), timestamp);
                            return;
                        }
                        if (EventType.isWrite(eventType)) {
                            eventHandler.invokeInsert(serverId, database, table, rowMutationEventData.getInsertRows(), timestamp);
                        }
                    });
                }
            }
        }
        if (clientConfig.getPersistence()) {
            if (binlogPositionHandler != null) {
                if (eventType != EventType.FORMAT_DESCRIPTION) {
                    BinlogPosition binlogPosition = new BinlogPosition();
                    if (EventType.ROTATE == eventType) {
                        binlogPosition.setServerId(clientConfig.getServerId());
                        binlogPosition.setFilename(((RotateEventData) event.getData()).getBinlogFilename());
                        binlogPosition.setPosition(((RotateEventData) event.getData()).getBinlogPosition());
                    } else {
                        binlogPosition = binlogPositionHandler.loadPosition(clientConfig.getServerId());
                        if (binlogPosition != null) {
                            binlogPosition.setPosition(headerV4.getNextPosition());
                        }
                    }
                    binlogPositionHandler.savePosition(binlogPosition);
                }
            }
        }
    }


    @Data
    public static class RowMutationEventData {

        private long tableId;

        private List<Serializable[]> insertRows;

        private List<Serializable[]> deleteRows;

        private List<Map.Entry<Serializable[], Serializable[]>> updateRows;

        public RowMutationEventData(EventData eventData) {

            if (eventData instanceof UpdateRowsEventData) {
                UpdateRowsEventData updateRowsEventData = (UpdateRowsEventData) eventData;
                this.tableId = updateRowsEventData.getTableId();
                this.updateRows = updateRowsEventData.getRows();
                return;
            }

            if (eventData instanceof WriteRowsEventData) {
                WriteRowsEventData writeRowsEventData = (WriteRowsEventData) eventData;
                this.tableId = writeRowsEventData.getTableId();
                this.insertRows = writeRowsEventData.getRows();
                return;
            }

            if (eventData instanceof DeleteRowsEventData) {
                DeleteRowsEventData deleteRowsEventData = (DeleteRowsEventData) eventData;
                this.tableId = deleteRowsEventData.getTableId();
                this.deleteRows = deleteRowsEventData.getRows();
            }
        }
    }

    public static String removeComments(String input) {
        Pattern pattern = Pattern.compile("/\\*.*?\\*/");
        Matcher matcher = pattern.matcher(input);
        return matcher.replaceAll("");
    }

    public static Map<String, String> isDDL(EventType eventType, Event event) {
        try {
            Map<String, String> map = new HashMap<>();
            if (eventType == EventType.QUERY) {
                QueryEventData data = event.getData();
                if (data != null) {
                    String sql = data.getSql();

                    String output = removeComments(sql).trim().toUpperCase();
                    if (output.contains("ALTER TABLE") || output.contains("DROP") || output.contains("RENAME")) {
                        output=output.replaceAll("`", "").replaceAll(data.getDatabase().toUpperCase() + ".", "");
                        map.put("db", data.getDatabase().toLowerCase());
                        map.put("table", "");
                        map.put("sql", output);
                        List<String> tables = SQLParserUtils.getTables(output, DbType.mysql);

                        if(tables.size()==0){
                            if(output.startsWith("RENAME TABLE")){
                                String s = output.replaceAll("RENAME TABLE", "").replaceAll(";", "");
                                String[] tos = s.split(" TO ");
                                if(tos.length==0){
                                    return map;
                                }else{
                                    if(tos[0].replaceAll(" ","").contains("HOTDB_TEMP_")){
                                        map.put("table", tos[1].toLowerCase().replaceAll(" ",""));
                                    }else{
                                        map.put("table", tos[0].toLowerCase().replaceAll(" ",""));
                                    }
                                    map.put("db", data.getDatabase().toLowerCase().replaceAll(" ",""));

                                    map.put("sql", output);
                                    return map;
                                }
                            }
                        }else {
                            map.put("db", data.getDatabase().toLowerCase());
                            map.put("table", tables.get(0).toLowerCase());
                            map.put("sql", output);
                            return map;
                        }
                    }
                }
            }
            return map;
        }catch (Exception e){
            log.error("isDDLException======================》"+e.toString());
          return new HashMap<>();
        }
    }

}
